# Computations
import numpy as np
import pandas as pd
from scipy.stats import norm
# preprocessing
from sklearn.preprocessing import StandardScaler
# sklearn
from sklearn import metrics
from sklearn.model_selection import train_test_split, StratifiedShuffleSplit
# Pytorch
import torch
from torch.autograd import Variable
import torch.nn as nn
import torchvision.transforms as transforms
# Visualisation libraries
## Progress Bar
import progressbar
## Text
from colorama import Fore, Back, Style
from IPython.display import Image, display, Markdown, Latex, clear_output
## plotly
from plotly.offline import init_notebook_mode, iplot
import plotly.graph_objs as go
import plotly.offline as py
from plotly.subplots import make_subplots
import plotly.express as px
## seaborn
import seaborn as sns
sns.set_style("whitegrid")
sns.set_context("paper", rc={"font.size":12,"axes.titlesize":14,"axes.labelsize":12})
## matplotlib
import matplotlib.pyplot as plt
from matplotlib.font_manager import FontProperties
from matplotlib.patches import Ellipse, Polygon
import matplotlib.gridspec as gridspec
import matplotlib.colors
from pylab import rcParams
plt.style.use('seaborn-whitegrid')
import matplotlib as mpl
mpl.rcParams['figure.figsize'] = (17, 6)
mpl.rcParams['axes.labelsize'] = 14
mpl.rcParams['xtick.labelsize'] = 12
mpl.rcParams['ytick.labelsize'] = 12
mpl.rcParams['text.color'] = 'k'
%matplotlib inline
import warnings
warnings.filterwarnings("ignore")
In this article, we analyze and predict customer churn for Telco Customer Churn data.
| Columns | Description |
|---|---|
| customerID | Customer ID |
| gender | Whether the customer is a male or a female |
| SeniorCitizen | Whether the customer is a senior citizen or not (1, 0) |
| Partner | Whether the customer has a partner or not (Yes, No) |
| Dependents | Whether the customer has dependents or not (Yes, No) |
| tenure | Number of months the customer has stayed with the company |
| PhoneService | Whether the customer has a phone service or not (Yes, No) |
| MultipleLines | Whether the customer has multiple lines or not (Yes, No, No phone service) |
| InternetService | Customer’s internet service provider (DSL, Fiber optic, No) |
| OnlineSecurity | Whether the customer has online security or not (Yes, No, No internet service) |
| OnlineBackup | Whether the customer has an online backup or not (Yes, No, No internet service) |
| DeviceProtection | Whether the customer has device protection or not (Yes, No, No internet service) |
| TechSupport | Whether the customer has tech support or not (Yes, No, No internet service) |
| StreamingTV | Whether the customer has streaming TV or not (Yes, No, No internet service) |
| StreamingMovies | Whether the customer has streaming movies or not (Yes, No, No internet service) |
| Contract | The contract term of the customer (Month-to-month, One year, Two years) |
| PaperlessBilling | Whether the customer has paperless billing or not (Yes, No) |
| PaymentMethod | The customer’s payment method (Electronic check, Mailed check, Bank transfer (automatic), Credit card (automatic)) |
| MonthlyCharges | The amount charged to the customer monthly |
| TotalCharges | The total amount charged to the customer |
| Churn | Whether the customer churned or not (Yes or No) |
Data = pd.read_csv('telco-customer-churn/WA_Fn-UseC_-Telco-Customer-Churn_clean.csv')
df = pd.read_csv('telco-customer-churn/WA_Fn-UseC_-Telco-Customer-Churn_clean_STD.csv')
Target = 'Churn'
def Header(Text, L = 100, C1 = Back.BLUE, C2 = Fore.BLUE):
print(C1 + Fore.WHITE + Style.NORMAL + Text + Style.RESET_ALL + ' ' + C2 +
Style.NORMAL + (L- len(Text) - 1)*'=' + Style.RESET_ALL)
def Line(L=100, C = Fore.BLUE): print(C + Style.NORMAL + L*'=' + Style.RESET_ALL)
def Search_List(Key, List): return [s for s in List if Key in s]
def Table1(Inp, Feat = Target):
Out = Inp[Feat].value_counts().to_frame('Number of Instances').reset_index()
Out = Out.rename(columns = {'index': Feat})
Out['Percentage'] = np.round(100* Out['Number of Instances'].values /Out['Number of Instances'].sum(), 2)
return Out
X = df.drop(columns = Target).values
y = df[Target].values
Labels = ['No', 'Yes']
Test_Size = 0.3
sss = StratifiedShuffleSplit(n_splits=1, test_size=Test_Size, random_state=42)
_ = sss.get_n_splits(X, y)
for train_index, test_index in sss.split(X, y):
X_train, X_test = X[train_index], X[test_index]
y_train, y_test = y[train_index], y[test_index]
del sss
Header('Train Set')
Temp = Table1(df, Feat = Target)
_, Temp['Number of Instances'] = np.unique(y_train, return_counts=True)
Temp['Percentage'] = np.round(100* Temp['Number of Instances'].values /Temp['Number of Instances'].sum(), 2)
Temp[Target] = Temp[Target].map(lambda x: Labels[0] if x ==0 else Labels[1])
display(Temp.style.format({'Percentage': "{:.2f}"}).hide_index())
Header('Test Set')
Temp = Table1(df, Feat = Target)
_, Temp['Number of Instances'] = np.unique(y_test, return_counts=True)
Temp['Percentage'] = np.round(100* Temp['Number of Instances'].values /Temp['Number of Instances'].sum(), 2)
Temp[Target] = Temp[Target].map(lambda x: Labels[0] if x ==0 else Labels[1])
display(Temp.style.format({'Percentage': "{:.2f}"}).hide_index())
del Temp
Line()
Train Set ==========================================================================================
| Churn | Number of Instances | Percentage |
|---|---|---|
| No | 3622 | 73.47 |
| Yes | 1308 | 26.53 |
Test Set ===========================================================================================
| Churn | Number of Instances | Percentage |
|---|---|---|
| No | 1552 | 73.45 |
| Yes | 561 | 26.55 |
====================================================================================================
A multi-layer perceptron (MLP) is a class of feedforward artificial neural network (ANN). The algorithm at each iteration uses the Cross-Entropy Loss to measure the loss, and then the gradient and the model update is calculated. At the end of this iterative process, we would reach a better level of agreement between test and predicted sets since the error would be lower from that of the first step.
# Setting up Tensor Arrays
if torch.cuda.is_available():
X_train_tensor = Variable(torch.from_numpy(X_train).cuda())
y_train_tensor = Variable(torch.from_numpy(y_train).type(torch.LongTensor).cuda())
X_test_tensor = Variable(torch.from_numpy(X_test).cuda())
y_test_tensor = Variable(torch.from_numpy(y_test).type(torch.LongTensor).cuda())
else:
X_train_tensor = Variable(torch.from_numpy(X_train))
y_train_tensor = Variable(torch.from_numpy(y_train).type(torch.LongTensor))
X_test_tensor = Variable(torch.from_numpy(X_test))
y_test_tensor = Variable(torch.from_numpy(y_test).type(torch.LongTensor))
Batch_size = 100
iteration_number = int(2e4)
epochs_number = int(iteration_number / (len(X_train) / Batch_size))
# Pytorch train and test sets
Train_set = torch.utils.data.TensorDataset(X_train_tensor, y_train_tensor)
Test_set = torch.utils.data.TensorDataset(X_test_tensor, y_test_tensor)
# data loader
train_loader = torch.utils.data.DataLoader(Train_set, batch_size = Batch_size, shuffle = False)
test_loader = torch.utils.data.DataLoader(Train_set, batch_size = Batch_size, shuffle = False)
# Model
class MLP_Model(nn.Module):
def __init__(self, input_Size, hidden_Size, output_Size):
super(MLP_Model, self).__init__()
# Linear function 1:
self.fc1 = nn.Linear(input_Size, hidden_Size)
# Non-linearity 1
self.relu1 = nn.ReLU()
# Linear function 2:
self.fc2 = nn.Linear(hidden_Size, hidden_Size)
# Non-linearity 2
self.tanh2 = nn.Tanh()
# Linear function 3:
self.fc3 = nn.Linear(hidden_Size, hidden_Size)
# Non-linearity 3
self.elu3 = nn.ELU()
# Linear function 4:
self.fc4 = nn.Linear(hidden_Size, hidden_Size)
# Non-linearity 3
self.elu4 = nn.ELU()
# Linear function 5:
self.fc5 = nn.Linear(hidden_Size, output_Size)
def forward(self, x):
# Linear function 1
out = self.fc1(x)
# Non-linearity 1
out = self.relu1(out)
# Linear function 2
out = self.fc2(out)
# Non-linearity 2
out = self.tanh2(out)
# Linear function 3
out = self.fc3(out)
# Non-linearity 3
out = self.elu3(out)
# Linear function 4
out = self.fc4(out)
# Non-linearity 4
out = self.elu4(out)
# Linear function 5 (readout)
out = self.fc5(out)
return out
input_Size, output_Size = len(X[0]), len(np.unique(y))
hidden_Size = 64
# model
model = MLP_Model(input_Size, hidden_Size, output_Size)
# GPU
if torch.cuda.is_available():
model.cuda()
# Cross Entropy Loss
CEL= nn.CrossEntropyLoss()
# Optimizer
learning_rate = 1e-2
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
# Traning the Model
Count = 0
Loss_list = []
Iteration_list = []
Accuracy_list = []
MSE_list = []
MAE_list = []
Steps = 10
Progress_Bar = progressbar.ProgressBar(maxval= iteration_number + 300,
widgets=[progressbar.Bar('=', '|', '|'),
progressbar.Percentage()])
# print('---------------------------------------------------------')
for epoch in range(epochs_number):
for i, (Xtr, ytr) in enumerate(train_loader):
# Variables
Xtr = Variable(Xtr.view(-1, X[0].shape[0]))
ytr = Variable(ytr)
# Set all gradients to zero
optimizer.zero_grad()
# Forward
Out = model(Xtr.float())
# loss
loss = CEL(Out, ytr.long())
# Backward (Calculating the gradients)
loss.backward()
# Update parameters
optimizer.step()
Count += 1
del Xtr, ytr
# Predictions
if Count % Steps == 0:
# Calculate Accuracy
Correct, Total = 0, 0
# Predictions
for Xts, yts in test_loader:
Xts = Variable(Xts.view(-1, X[0].shape[0]))
# Forward
Out = model(Xts.float())
# The maximum value of Out
Predicted = torch.max(Out.data, 1)[1]
# Total number of yts
Total += len(yts)
# Total Correct predictions
Correct += (Predicted == yts).sum()
del Xts, yts
# storing loss and iteration
Loss_list.append(loss.data)
Iteration_list.append(Count)
Accuracy_list.append(Correct / float(Total))
Progress_Bar.update(Count)
Progress_Bar.finish()
history = pd.DataFrame({'Iteration': np.array(Iteration_list),
'Loss': np.array([x.cpu().data.numpy() for x in Loss_list]),
'Accuracy': np.array([x.cpu().data.numpy() for x in Accuracy_list])})
del Loss_list, Iteration_list, Accuracy_list
|=========================================================================|100%
def Plot_history(history, Table_Rows = 25, yLim = 2):
fig = make_subplots(rows=1, cols=2, horizontal_spacing = 0.02, column_widths=[0.6, 0.4],
specs=[[{"type": "scatter"},{"type": "table"}]])
# Left
fig.add_trace(go.Scatter(x= history['Iteration'].values, y= history['Loss'].astype(float).values.round(4),
line=dict(color='OrangeRed', width= 1.5), name = 'Loss'), 1, 1)
fig.add_trace(go.Scatter(x= history['Iteration'].values, y= history['Accuracy'].astype(float).values,
line=dict(color='MidnightBlue', width= 1.5), name = 'Accuracy'), 1, 1)
fig.update_layout(legend=dict(x=0, y=1.1, traceorder='reversed', font_size=12),
dragmode='select', plot_bgcolor= 'white', height=600, hovermode='closest',
legend_orientation='h')
fig.update_xaxes(range=[history.Iteration.min(), history.Iteration.max()],
showgrid=True, gridwidth=1, gridcolor='Lightgray',
showline=True, linewidth=1, linecolor='Lightgray', mirror=True, row=1, col=1)
fig.update_yaxes(range=[0, yLim], showgrid=True, gridwidth=1, gridcolor='Lightgray',
showline=True, linewidth=1, linecolor='Lightgray', mirror=True, row=1, col=1)
# Right
ind = np.linspace(0, history.shape[0], Table_Rows, endpoint = False).round(0).astype(int)
ind = np.append(ind, history.Iteration.values[-1])
history = history[history.index.isin(ind)]
fig.add_trace(go.Table(header=dict(values = list(history.columns), line_color='darkslategray',
fill_color='DimGray', align=['center','center'],
font=dict(color='white', size=12), height=25), columnwidth = [0.4, 0.4, 0.4, 0.4],
cells=dict(values=[history.Iteration, history.Loss.astype(float).round(4).values,
history.Accuracy.astype(float).round(4).values],
line_color='darkslategray', fill=dict(color=['WhiteSmoke', 'white']),
align=['center', 'center'], font_size=12,height=20)), 1, 2)
fig.show()
Plot_history(history, Table_Rows = 18, yLim = 1)
The confusion matrix allows for visualization of the performance of an algorithm.
def Confusion_Matrix(Model, FG = (12, 4), X_train_tensor = X_train_tensor, y_train = y_train,
X_test_tensor = X_test_tensor, y_test = y_test):
font = FontProperties()
font.set_weight('bold')
############# Train Set #############
fig, ax = plt.subplots(1, 2, figsize=FG)
_ = fig.suptitle('Train Set', fontproperties=font, fontsize = 16)
# Predictions
y_pred = model(X_train_tensor.float())
y_pred = torch.max(y_pred.data, 1)[1]
y_pred = y_pred.cpu().data.numpy()
# confusion matrix
CM = metrics.confusion_matrix(y_train, y_pred)
CM_Train = CM.copy()
_ = sns.heatmap(CM.round(2), annot=True, annot_kws={"size": 14}, cmap="Blues", ax = ax[0])
_ = ax[0].set_title('Confusion Matrix')
CM = CM.astype('float') / CM.sum(axis=1)[:, np.newaxis]
_ = sns.heatmap(CM.round(2), annot=True, annot_kws={"size": 14}, cmap="Greens", ax = ax[1],
linewidths = 0.2, vmin=0, vmax=1, cbar_kws={"shrink": 1})
_ = ax[1].set_title('Normalized Confusion Matrix')
for a in ax:
_ = a.set_xlabel('Predicted labels')
_ = a.set_ylabel('True labels')
_ = a.xaxis.set_ticklabels(Labels)
_ = a.yaxis.set_ticklabels(Labels)
_ = a.set_aspect(1)
del CM
############# Test Set #############
fig, ax = plt.subplots(1, 2, figsize=FG)
_ = fig.suptitle('Test Set', fontproperties=font, fontsize = 16)
font = FontProperties()
font.set_weight('bold')
# Predictions
y_pred = model(X_test_tensor.float())
y_pred = torch.max(y_pred.data, 1)[1]
y_pred = y_pred.cpu().data.numpy()
# confusion matrix
CM = metrics.confusion_matrix(y_test, y_pred)
CM_Test = CM.copy()
_ = sns.heatmap(CM.round(2), annot=True, annot_kws={"size": 14}, cmap="Blues", ax = ax[0])
_ = ax[0].set_title('Confusion Matrix')
CM = CM.astype('float') / CM.sum(axis=1)[:, np.newaxis]
_ = sns.heatmap(CM.round(2), annot=True, annot_kws={"size": 14}, cmap="Greens", ax = ax[1],
linewidths = 0.2, vmin=0, vmax=1, cbar_kws={"shrink": 1})
_ = ax[1].set_title('Normalized Confusion Matrix')
for a in ax:
_ = a.set_xlabel('Predicted labels')
_ = a.set_ylabel('True labels')
_ = a.xaxis.set_ticklabels(Labels)
_ = a.yaxis.set_ticklabels(Labels)
_ = a.set_aspect(1)
del CM
return CM_Train, CM_Test
A confusion matrix allows the visualization of the performance of a classification model.
CM_Train, CM_Test = Confusion_Matrix(model)
Note that:
where $T_p$, $T_n$, $F_p$, and $F_n$ represent true positive, true negative, false positive, and false negative, respectively.
However, the accuracy can be a misleading metric for imbalanced data sets. Here, over 88 percent of the sample has negative (No) and about 12 percent has positive (Yes) values. In these cases, a balanced accuracy (bACC) [4] is recommended that normalizes true positive and true negative predictions by the number of positive and negative samples, respectively, and divides their sum by two:
\begin{align} \text{TPR} &= \frac{T_p}{T_p + F_n},\\ \text{TNR} &= \frac{T_N}{T_p + F_p},\\ \text{Balanced Accuracy (bACC)} &= \frac{1}{2}\left(\text{TPR}+\text{TNR}\right) \end{align}Header('Train Set')
tn, fp, fn, tp = CM_Train.ravel()
Precision = tp/(tp+fp)
Recall = tp/(tp + fn)
TPR = tp/(tp +fn)
TNR = tn/(tn +fp)
BA = (TPR + TNR)/2
PPCR = (tp + fp)/(tp + fp + tn+ fn)
print('Precision (Train) = %.2f' % Precision)
print('Recall (Train) = %.2f' % Recall)
print('TPR (Train) = %.2f' % TPR)
print('TNR (Train) = %.2f' % TNR)
print('Balanced Accuracy (Train) = %.2f' % BA)
Header('Test Set')
tn, fp, fn, tp = CM_Test.ravel()
Precision = tp/(tp+fp)
Recall = tp/(tp + fn)
TPR = tp/(tp +fn)
TNR = tn/(tn +fp)
BA = (TPR + TNR)/2
PPCR = (tp + fp)/(tp + fp + tn+ fn)
print('Precision (Test) = %.2f' % Precision)
print('Recall (Test) = %.2f' % Recall)
print('TPR (Test) = %.2f' % TPR)
print('TNR (Test) = %.2f' % TNR)
print('Balanced Accuracy (Test) = %.2f' % BA)
del tn, fp, fn, tp, Precision, Recall, TPR, TNR, BA, PPCR
Line()
Train Set ========================================================================================== Precision (Train) = 0.70 Recall (Train) = 0.79 TPR (Train) = 0.79 TNR (Train) = 0.88 Balanced Accuracy (Train) = 0.83 Test Set =========================================================================================== Precision (Test) = 0.52 Recall (Test) = 0.60 TPR (Test) = 0.60 TNR (Test) = 0.80 Balanced Accuracy (Test) = 0.70 ====================================================================================================
Now for any given dataset, we can predict churn
Sample = df.sample(frac = 0.1)
X_sample = Sample.drop(columns = [Target]).values
if torch.cuda.is_available():
X_sample_tensor = Variable(torch.from_numpy(X_sample).cuda())
else:
X_sample_tensor = Variable(torch.from_numpy(X_sample))
y_pred = model(X_sample_tensor.float())
y_pred = np.asarray(y_pred.cpu().detach().numpy())
y_pred = pd.Series(y_pred.argmax(axis=1)).to_frame('Churn (Predicted)').applymap(lambda x: Labels[0] if x ==0 else Labels[1])
Predictions = pd.concat([Data.loc[Sample.index, ['customer ID','Churn']].reset_index(drop = True), y_pred], axis = 1)
Predictions['Churn'] = Predictions['Churn'].map(lambda x: Labels[0] if x ==0 else Labels[1])
display(Predictions.head(15))
| customer ID | Churn | Churn (Predicted) | |
|---|---|---|---|
| 0 | 2003-CKLOR | Yes | No |
| 1 | 5887-IKKYO | No | No |
| 2 | 5879-HMFFH | No | No |
| 3 | 4232-JGKIY | No | Yes |
| 4 | 0156-FVPTA | Yes | No |
| 5 | 2361-FJWNO | No | No |
| 6 | 0876-WDUUZ | Yes | Yes |
| 7 | 7047-YXDMZ | No | No |
| 8 | 1682-VCOIO | No | No |
| 9 | 0439-IFYUN | No | Yes |
| 10 | 1265-ZFOSD | No | No |
| 11 | 7075-BNDVQ | No | No |
| 12 | 4884-TVUQF | No | No |
| 13 | 9073-ZZIAY | No | No |
| 14 | 2868-MZAGQ | Yes | Yes |
Although the model is doing pretty well considering the complexity of this problem, we can improve the results by designing an iterative optimization that utilizes the accuracy and recall scores.